[Flink] Kvscan flink integration#3383
Conversation
ddb782e to
ecdb181
Compare
| Enable the feature by setting `client.scanner.kv.server-side.enabled = true` on the table or as a SQL hint: | ||
|
|
||
| - This is a **bounded** read. The source finishes once all buckets have been drained and does not continue reading the change-log. | ||
| - On task restart, each bucket is rescanned from scratch. Progress within a scan session is not checkpointed, because an expired or invalidated server-side session cannot be resumed from a mid-point. |
There was a problem hiding this comment.
On task restart, each bucket is scanned again from the beginning. Progress within an active scan session is not checkpointed because expired or invalidated server-side sessions cannot be resumed from an intermediate position.
|
|
||
| - This is a **bounded** read. The source finishes once all buckets have been drained and does not continue reading the change-log. | ||
| - On task restart, each bucket is rescanned from scratch. Progress within a scan session is not checkpointed, because an expired or invalidated server-side session cannot be resumed from a mid-point. | ||
| - The feature is disabled by default (`false`). Without it, unbounded (streaming) reads on primary-key tables work as usual; bounded reads require the data-lake integration to be enabled. |
There was a problem hiding this comment.
When disabled, unbounded (streaming) reads on primary-key tables continue to work as usual. Bounded reads require data-lake integration unless server-side KV scanning is enabled.
| SELECT * FROM pk_table; | ||
| ``` | ||
|
|
||
| You can also enable the feature dynamically without storing it in the table metadata: |
There was a problem hiding this comment.
You can also enable server-side scanning dynamically without storing the option in table metadata:
| ``` | ||
|
|
||
| ### Limit Read | ||
| The Fluss source supports limiting reads for both primary-key tables and log tables, making it convenient to preview the latest `N` records in a table. |
There was a problem hiding this comment.
making it easy to preview the latest N records in a table.
|
|
||
| Fluss can perform a bounded full-table scan on a primary-key table directly via the server-side KV scan API. | ||
|
|
||
| Enable the feature by setting `client.scanner.kv.server-side.enabled = true` on the table or as a SQL hint: |
There was a problem hiding this comment.
Enable this feature by setting client.scanner.kv.server-side.enabled = true in the table options or by using a SQL hint.
|
|
||
| Enable the feature by setting `client.scanner.kv.server-side.enabled = true` on the table or as a SQL hint: | ||
|
|
||
| - This is a **bounded** read. The source finishes once all buckets have been drained and does not continue reading the change-log. |
There was a problem hiding this comment.
The source finishes after all buckets have been scanned and does not continue consuming the change log.
| | scan.partition.discovery.interval | Duration | 1min | The time interval for the Fluss source to discover the new partitions for partitioned table while scanning. A non-positive value disables the partition discovery. The default value is 1 minute. Currently, since Fluss Admin#listPartitions(TablePath tablePath) requires a large number of requests to ZooKeeper in server, this option cannot be set too small, as a small value would cause frequent requests and increase server load. In the future, once list partitions is optimized, the default value of this parameter can be reduced. | | ||
| | scan.kv.snapshot.lease.id | String | UUID | The lease ID used to protect acquired KV snapshots from deletion. If specified, the snapshots will be retained until either the consumer finishes processing all of them or the lease duration expires. By default, this value is set to a randomly generated UUID string if not explicitly provided. | | ||
| | scan.kv.snapshot.lease.duration | Duration | 1day | The time period how long to wait before expiring the kv snapshot lease to avoid kv snapshot blocking to delete. | | ||
| | client.scanner.kv.server-side.enabled | Boolean | false | Master switch for using the server-side KV scan (FIP-17) in bounded reads of primary-key tables when no KV snapshot file is available. When false (default), bounded primary-key reads fall back to the prior behavior (log-only when lake is enabled, or fail when lake is disabled). See [Full Scan of Primary Key Tables](engine-flink/reads.md#full-scan-of-primary-key-tables) for details. | |
There was a problem hiding this comment.
Enables server-side KV scanning (FIP-17) for bounded reads on primary-key tables when no KV snapshot file is available. When disabled (default), bounded reads fall back to the previous behavior: read from the log when data-lake integration is enabled, or fail when it is disabled.
#3126 extended the java client to support kvscan for the live rocksdb table.
This PR integrates that functionality inside the flink connector
Introduces
KvBatchSplitandKvBatchSplitStateas a new split type in the Flink source, enabling the connector to perform bounded full-table scans on primary-key tables via the server-side KV scan API (FIP-17), rather than reading from snapshots.